package com.atguigu.gmall.realtime.app.dwd.db;

import com.atguigu.gmall.realtime.utils.KafkaUtil;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;

/**
 * @author 黄凯
 * @date 2023/7/10
 * 交易域：加购事实表
 * 需要启动的进程
 *      zk、kafka、maxwell、DwdTradeCartAdd
 */
public class DwdTradeCartAdd {
    public static void main(String[] args) {
        //TODO 1.基本环境准备
        //1.1 指定流处理环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        //1.2 设置并行度
        env.setParallelism(4);
        //1.3 指定表执行环境
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //TODO 2.检查点相关的设置(略)
        //TODO 3.从kafka的topic_db主题中读取数据 创建动态表
        String groupId = "dwd_trade_cart_add_group";
        tableEnv.executeSql(KafkaUtil.getTopicDbDDL(groupId));
        // tableEnv.executeSql("select * from topic_db").print();

        //TODO 4.过滤出加购行为
        Table cartInfo = tableEnv.sqlQuery("select \n" +
            "    `data`['id'] id,\n" +
            "     `data`['user_id'] user_id,\n" +
            "     `data`['sku_id'] sku_id,\n" +
            "     if(`type`='insert',`data`['sku_num'],cast ((CAST(`data`['sku_num'] AS INT) - CAST(`old`['sku_num'] AS INT)) as string)) sku_num,\n" +
            "     ts\n" +
            " from topic_db\n" +
            " where `table`='cart_info' and (`type`='insert' \n" +
            "     or (`type` ='update' and `old`['sku_num'] is not null \n" +
            "     and CAST(`data`['sku_num'] AS INT) > CAST(`old`['sku_num'] AS INT)))");
        tableEnv.createTemporaryView("cart_info",cartInfo);
        // tableEnv.executeSql("select * from cart_info").print();

        //TODO 5.将过滤出来的加购数据写到kafka的主题
        //5.1 创建动态表 和要写入的主题进行映射
        tableEnv.executeSql("CREATE TABLE dwd_trade_cart_add (\n" +
            "  id string,\n" +
            "  user_id string,\n" +
            "  sku_id string,\n" +
            "  sku_num string,\n" +
            "  ts string,\n" +
            "  PRIMARY KEY (id) NOT ENFORCED\n" +
            ") " + KafkaUtil.getUpsertKafkaDDL("dwd_trade_cart_add"));
        //5.2 写入
        tableEnv.executeSql("insert into dwd_trade_cart_add select * from cart_info");
    }
}
